-
Notifications
You must be signed in to change notification settings - Fork 331
feat(datafusion): implement the project node to add the partition columns #1602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(datafusion): implement the project node to add the partition columns #1602
Conversation
b3a8601
to
40a225a
Compare
…umns defined in Iceberg. Implement physical execution plan node that projects Iceberg partition columns from source data, supporting nested fields and all Iceberg transforms.
40a225a
to
4d59f87
Compare
let field_path = Self::find_field_path(&self.table_schema, source_field.id)?; | ||
let index_path = Self::resolve_arrow_index_path(batch_schema.as_ref(), &field_path)?; | ||
|
||
let source_column = Self::extract_column_by_index_path(batch, &index_path)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very interesting! I actually came across the similar issue when implementing the sort node, and I was leaning toward implementing a new SchemaWithPartnerVisitor
, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect 👌
I was initially thinking this was needed just for this implementation, but it seems the right place would be closer to the Schema definition. Since this is a standard method for accessing column values by index, it makes sense to generalize!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I drafted a PartitionValueVisitor
here to help extract partition values from a record batch in tree-traversal style
Pleast let me know what you think!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just saw this implementation to extract partition values and it actually makes more sense to me that it leverages the existing RecordBatchProjector
: #1040
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good, thanks for sharing. I will use #1040 when merged!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @CTTY 👋,
I can use it now, but I have one concern about leveraging RecordBatchPartitionSplitter
, it relies on PARQUET_FIELD_ID_META_KEY.
Since DataFusion doesn’t use this key, do you think we should adapt this method to make it compatible with DataFusion?
} | ||
|
||
/// Find the path to a field by its ID (e.g., ["address", "city"]) in the Iceberg schema | ||
fn find_field_path(table_schema: &Schema, field_id: i32) -> DFResult<Vec<String>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…n containing all the partitions values
7f8404f
to
7be558d
Compare
57fe2dd
to
bc805db
Compare
Hi, @fvaleye is this pr ready for review or you still need some effort to improve it? |
Hi @liurenjie1024 👋, So, yes, it's ready for review. However, it might require additional refactoring if we want to make these utility functions more general. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @fvaleye for this pr! I left some comments to improve, and I still have other questions:
- What's the entry point of this module?
- Could the entrypoint of this module be a funtion like sth below:
fn porject_with_partition(input: &ExecutionPlan, table: &Table) -> Result<Arc<dyn ExecutionPlan>> {
// This method extend `input` with an extra `PhysicalExpr`, which calculates the partition value.
....
}
/// Extract a column from a record batch by following an index path. | ||
/// The index path specifies the column indices to traverse for nested structures. | ||
#[allow(dead_code)] | ||
fn extract_column_by_index_path(batch: &RecordBatch, index_path: &[usize]) -> DFResult<ArrayRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we reuse RecordBatchProjection
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried, but I kept this implementation, the main reasons are below:
1. Metadata Dependency:
RecordBatchProjector depends on Arrow field metadata containing PARQUET:field_id
This metadata is added when reading Parquet files through Iceberg's reader
DataFusion ExecutionPlans might not always have this metadata preserved
2. Using the Iceberg table's schema directly
We resolve field paths using field names, not IDs
This works regardless of whether Arrow metadata is present
Depending on what you think:
- We could keep this implementation working with DataFusion
- Readapt
RecordBatchProjection
but it feels like it's not the same intent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced. There are two ways to solve your issue:
- Add a constructor in
RecordBatchProjector
to accept iceberg schema and target field ids. - Convert iceberg schema to arrow schema, the converter will add
field_id
metadata.
Personally I prefer approach 1, but I don't have a strong opinion about. After using RecordBatchProjector
, the whole pr could be simplified a lot.
…use PhysicalExpr for partitions values calculation. Signed-off-by: Florian Valeye <[email protected]>
/// Extract a column from a record batch by following an index path. | ||
/// The index path specifies the column indices to traverse for nested structures. | ||
#[allow(dead_code)] | ||
fn extract_column_by_index_path(batch: &RecordBatch, index_path: &[usize]) -> DFResult<ArrayRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced. There are two ways to solve your issue:
- Add a constructor in
RecordBatchProjector
to accept iceberg schema and target field ids. - Convert iceberg schema to arrow schema, the converter will add
field_id
metadata.
Personally I prefer approach 1, but I don't have a strong opinion about. After using RecordBatchProjector
, the whole pr could be simplified a lot.
let field_path = find_field_path(&self.table_schema, source_field.id)?; | ||
let index_path = resolve_arrow_index_path(batch_schema.as_ref(), &field_path)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to to them for every batch.
let partition_value = transform_fn | ||
.transform(source_column) | ||
.map_err(to_datafusion_error)?; | ||
let transform_fn = iceberg::transform::create_transform_function(&pf.transform) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto, this only needs to be done once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not resolved, we could create trnasform functions in constructor.
Signed-off-by: Florian Valeye <[email protected]>
.map(|pf| pf.source_id) | ||
.collect(); | ||
|
||
let projector = RecordBatchProjector::from_iceberg_schema_mapping( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need first batch to get input's schema, see: https://github.com/apache/datafusion/blob/921f4a028409f71b68bed7d05a348255bb6f0fba/datafusion/physical-plan/src/execution_plan.rs#L106
let partition_value = transform_fn | ||
.transform(source_column) | ||
.map_err(to_datafusion_error)?; | ||
let transform_fn = iceberg::transform::create_transform_function(&pf.transform) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not resolved, we could create trnasform functions in constructor.
…ation Signed-off-by: Florian Valeye <[email protected]>
edb4719
to
d4fd336
Compare
mod reader; | ||
pub(crate) mod record_batch_projector; | ||
/// RecordBatch projection utilities | ||
pub mod record_batch_projector; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to make this pub?
|
||
impl std::hash::Hash for PartitionExpr { | ||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) { | ||
std::any::TypeId::of::<Self>().hash(state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little odd, why not derive Hash
for PartitionValueCalculator
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored to use the approach between Hash
and PartialEq
.
For now, we can't derive Hash
for PartitionValueCalculator
contains:
partition_type: DataType
: Arrow's DataType does not implement Hash
projector: RecordBatchProjector
: does not implement Hash
transform_functions: Vec<BoxedTransformFunction>
trait objects cannot be hashed
|
||
let input_schema = input.schema(); | ||
let partition_type = build_partition_type(partition_spec, table_schema.as_ref())?; | ||
let calculator = PartitionValueCalculator::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is implicit assume that the input_schema
exactly matches iceberg table schema. I think this assumption is valid for now, but we should add a check here to ensure that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can add a method for this.
…e schemas Signed-off-by: Florian Valeye <[email protected]>
Which issue does this PR close?
What changes are included in this PR?
Implement a physical execution plan node that projects Iceberg partition columns from source data, supporting nested fields and all Iceberg transforms.
Are these changes tested?
Yes, with unit tests